「ちょい見せ」で、お菓子の種類を推論してみました
1 はじめに
CX事業本部の平内(SIN)です。
機械学習の推論には、ある程度の時間がかかるため、動画で高速なFPSを出すのは、比較的難しいと思います。
今回は、対象の商品を一瞬だけカメラに見せて「ちょい見せ」、推論する仕組みを検討してみました。
使用したのは、RaspberryPi 4とRaspberryPiカメラモジュール、そしてIntel NeuralStick2(NCS2)です。
最初に、動作している様子です。
2 モデル
モデルは、Amazon SageMakerの組み込みアルゴリズム(画像分類)で作成したものです。
OpenVINOツールキットで使用可能なように、Model OptimizerでIR(中間表現フォーマット)に変換しました。
3 90fpsで撮影
RaspberryPiのカメラモジュールは、VGA(640x480)で90fpsの撮影が可能です。
参考:Raspberry Pi カメラモジュール V2
「ちょい見せ」で、可能な限り鮮明な画像を取得できるように、このカメラを使用して、撮影に専念するスレッドを作成しました。
このスレッドでは、前フレームとの差分を常に計算し、変化がある間だけ画像を保存しています。 変化が始まると、保存が開始され、変化が終わった時、保存された画像のうち時間的に真ん中に位置する画像を取り出し「推論」を行っています。
4 コード
作成したコードは、以下のとおりです。
主要なクラスは以下のようになっています。
- Video Webカメラを処理するクラス
- Images 差分が生じている間に撮影画像を蓄積するクラス、中央画像を取り出すと、初期化されます
- Syohin40 商品識別モデルをラップするクラス、内部で入力形式への変換を行います
- Manager 状態遷移を管理するクラス、IDLE(待機) => SVAE(画像保存) => START(推論開始) => INFER(推論中)と遷移します
from numpy import asarray from dlr import DLRModel from PIL import Image import os import time import numpy as np import cv2 from dispFps import DispFps import threading from model import Model from openvino.inference_engine import IECore CLASSES = ["ポリッピー(GREEN)","OREO","カントリーマム","ポリッピー(RED)","柿の種(わさび)" ,"通のとうもろこし","CHEDDER_CHEESE","ピーナッツ","ストーンチョコ","PRETZEL(YELLOW)" ,"海味鮮","柿の種","カラフルチョコ","フルグラ(BROWN)","NOIR" ,"BANANA(BLOWN)","チーズあられ","俺のおやつ","PRIME","CRATZ(RED)" ,"CRATZ(GREEN)","揚一番","ポリッピー(YELLOW)","こつぶっこ","アスパラガス" ,"海苔ピーパック","いちご","梅しそチーズあられ","通のえだ豆","柿の種(梅しそ)" ,"PRETZEL(BLACK)","辛子明太子","CRATZ(ORANGE)","チョコメリゼ","フライドポテト(じゃがバター味)" ,"BANANA(BLUE)","でん六豆","パズル","フルグラ(RED)","PRETZEL(GREEN)" ,"フライドポテト(しお味)",] # Webカメラを処理するクラス class Video(): def __init__(self): WIDTH = 640 HEIGHT = 480 FPS = 90 self.__cap = cv2.VideoCapture(0) self.__cap.set(cv2.CAP_PROP_FRAME_WIDTH, WIDTH) self.__cap.set(cv2.CAP_PROP_FRAME_HEIGHT, HEIGHT) self.__cap.set(cv2.CAP_PROP_FPS, FPS) def __del__(self): self.__cap.release() cv2.destroyAllWindows() def read(self): return self.__cap.read() # 差分が生じている間に撮影画像を蓄積するクラス class Images(): def __init__(self, dir): self.__dir = dir os.makedirs(self.__dir, exist_ok=True) self.__saveCounter = 0 def __fileName(self, index): return "{}/{}.jpg".format(self.__dir, index) def save(self, frame): cv2.imwrite(self.__fileName(self.__saveCounter), frame) print("saved.") self.__saveCounter += 1 def get(self): # 撮影した画像の中心部分を使用する center = int(self.__saveCounter/2) image = cv2.imread(self.__fileName(center)) # 取得した時点で蓄積画像は削除する for i in range(self.__saveCounter): os.remove(self.__fileName(i)) self.__saveCounter = 0 return image # 商品識別モデル class Syohin40(Model): def __init__(self, model): ie = IECore() device = "MYRIAD" super().__init__(ie, device, model) _, _, h, w = self.input_size self.__input_height = h self.__input_width = w def __prepare_frame(self, frame): initial_h, initial_w = frame.shape[:2] scale_h, scale_w = initial_h / float(self.__input_height), initial_w / float(self.__input_width) in_frame = cv2.resize(frame, (self.__input_width, self.__input_height)) in_frame = in_frame.transpose((2, 0, 1)) in_frame = in_frame.reshape(self.input_size) return in_frame, scale_h, scale_w def infer(self, frame): in_frame, _, _ = self.__prepare_frame(frame) result = super().infer(in_frame) return result.squeeze() # 状態遷移を管理するクラス class Manager(): def __init__(self, previous): self.__previous = previous self.__status = "IDLE" # IDLE => SVAE => START => INFER self.__threshold = 1000 # 変化の敷居値 self.__span = 3 # 静止間隔 self.__counter = 0 def check(self, frame): diff = self.__getDiff(self.__previous, frame) self.__previous = frame if(self.__threshold < diff): self.__counter = 0 else: self.__counter += 1 if(self.__status == "IDLE"): if(self.__threshold < diff): self.__status = "SAVE" elif(self.__status == "SAVE"): print("diff:{}".format(diff)) if(self.__span < self.__counter): self.__status = "START" @property def Status(self): return self.__status @Status.setter def Status(self, status): self.__status = status # 差分を数値化 def __getDiff(self, img1, img2): # グレースケール変換 img1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) img2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) # 差分取得 mask = cv2.absdiff(img1, img2) # 2値化 _, mask = cv2.threshold(mask, 50, 255, cv2.THRESH_BINARY) diff = cv2.countNonZero(mask) # 白の要素数 mask = cv2.resize(mask, (320, 240)) cv2.imshow('mask', mask) return diff class Main(): def __init__(self): self.__images = Images("./tmpDir") self.__syohin40 = Syohin40("image-classification-0012") self.__video = Video() _, previous = self.__video.read() self.__manager = Manager(previous) def __infer(self): # 撮影した画像の中央画像を取得する image = self.__images.get() # 推論 start = time.time() out = self.__syohin40.infer(image) processing_time = time.time() - start print("processing_time {} sec".format(processing_time)) # 結果表示 prob = np.max(out) index = np.argmax(out) print("--------------------------------") print("Class: %s, probability: %f" % (CLASSES[index], prob)) print("--------------------------------") self.__manager.Status = "IDLE" def start(self): while(True): _, frame = self.__video.read() self.__manager.check(frame) if(self.__manager.Status == "SAVE"): self.__images.save(frame) elif(self.__manager.Status == "START"): self.__manager.Status = "INFER" infer = threading.Thread(target=self.__infer) infer.start() cv2.imshow('frame', frame) if cv2.waitKey(1) & 0xFF == ord('q'): break del self.__video main = Main() main.start()
model.py
class Model: def __init__(self, ie, device, model): modelPath = "./FP32/{}".format(model) if(device=="MYRIAD"): modelPath = "./FP16/{}".format(model) net = ie.read_network(modelPath + ".xml", modelPath + ".bin") self.exec_net = ie.load_network(network=net, device_name=device, num_requests=2) self.input_name = next(iter(net.input_info)) self.output_name = next(iter(net.outputs)) self.input_size = net.input_info[self.input_name].input_data.shape self.output_size = net.outputs[self.output_name].shape def infer(self, data): input_data = {self.input_name: data} infer_result = self.exec_net.infer(input_data) return infer_result[self.output_name]
5 最後に
今回は、可能な限り高速で撮影し、「ちょい見せ」での推論が、どれぐらい可能かを試してみました。
もう少し、推論処理に時間をかけられるなら、1枚の画像でなく、複数の画像で推論し、その合計値で判定すると、もっと精度が上がるかも知れません。